[Hadoop] 分布式处理框架MapReduce 三

参数调优(Map Task和Reduce Task数目调整)、MapReduce优点和缺点、MapReduce编程(WordCount案例开发Java版本、Combiner、Partitoner)、jobhistory开启

Posted by 李玉坤 on 2017-06-23

参数调优

Map Task和Reduce Task数目调整

  • Map Task数目
    • Map读取文件时,通过InputFormat计算分割文件
    • split大小由以下三个参数决定
      • dfs.blocksize HDFS Block大小
      • mapreduce.input.fileinputformat.split.minsize 划分最小字节数
      • mapreduce.input.fileinputformat.split.maxsize 划分最大字节数
      • 计算公式
1
2
3
protected long computeSplitSize(long blockSize, long minSize, long maxSize) {
return Math.max(minSize, Math.min(maxSize, blockSize));
}
  • Reduce Task数目
    • 默认每个作业Reduce Task数目可以通过mapreduce.job.reduces控制
    • 在每个作业中也可以通过Job.setNumReduceTasks(Int number)进行控制

容错参数调整

推测执行参数调整

内存参数调整

MapReduce优点和缺点

  • MapReduce特点
    • 模型简单
      • Map + Reduce
    • 高伸缩性
      • 支持横向扩展
    • 灵活
      • 结构化和非结构化数据
    • 速度快
      • 高吞吐离线处理数据
    • 并行处理
      • 编程模型天然支持并行处理
    • 容错能力强
  • 缺点
    • 流式数据-MapReduce处理模型就决定了需要静态数据
    • 实时计算-不适合低延迟数据处理,需要毫秒级别响应
    • 复杂算法-例如SVM支持向量机
    • 迭代计算-例如斐波那契数列

MapReduce编程

环境参考HDFS编程

WordCount案例开发Java版本

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
package com.kun.hadoop.mapreduce;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;

import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import java.io.IOException;
/**
* 使用MapRdeuce开发WordCount应用程序
*/
public class WordCountApp {


/**
* Map:读取输入的文件
*/
public static class MyMapper extends Mapper<LongWritable, Text, Text, LongWritable>{

LongWritable one = new LongWritable(1);

@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {

// 接收到的每一行数据
String line = value.toString();

//按照指定分隔符进行拆分
String[] words = line.split(" ");

for(String word : words) {
// 通过上下文把map的处理结果输出
context.write(new Text(word), one);
}

}
}

/**
* Reduce:归并操作
*/
public static class MyReducer extends Reducer<Text, LongWritable, Text, LongWritable> {

@Override
protected void reduce(Text key, Iterable<LongWritable> values, Context context) throws IOException, InterruptedException {

long sum = 0;
for(LongWritable value : values) {
// 求key出现的次数总和
sum += value.get();
}

// 最终统计结果的输出
context.write(key, new LongWritable(sum));
}
}

/**
* 定义Driver:封装了MapReduce作业的所有信息
*/
public static void main(String[] args) throws Exception{

//创建Configuration
Configuration configuration = new Configuration();

//创建Job
Job job = Job.getInstance(configuration, "wordcount");

//设置job的处理类
job.setJarByClass(WordCountApp.class);

//设置作业处理的输入路径
FileInputFormat.setInputPaths(job, new Path(args[0]));

//设置map相关参数
job.setMapperClass(MyMapper.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(LongWritable.class);

//设置reduce相关参数
job.setReducerClass(MyReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(LongWritable.class);

//设置作业处理的输出路径
FileOutputFormat.setOutputPath(job, new Path(args[1]));

System.exit(job.waitForCompletion(true) ? 0 : 1);
}
}

使用IDEA+Maven开发wc:
1)开发
2)编译:mvn clean package -DskipTests
3)上传到服务器:scp target/hadoop-train-XXX.jar
4)运行
hadoop jar /home/hadoop/lib/hadoop-train-1.0.jar com.kun.hadoop.mapreduce.WordCountApp hdfs://hadoop:9000/hello.txt hdfs://hadoop:9000/output/wc

注意:

  1. 相同的代码和脚本再次执行,会报错
    security.UserGroupInformation:
    PriviledgedActionException as:hadoop (auth:SIMPLE) cause:
    org.apache.hadoop.mapred.FileAlreadyExistsException:
    Output directory hdfs://hadoop:9000/output/wc already exists
    Exception in thread “main” org.apache.hadoop.mapred.FileAlreadyExistsException:
    Output directory hdfs://hadoop:9000/output/wc already exists
  2. 在MR中,输出文件是不能事先存在的
    1)先手工通过shell的方式将输出文件夹先删除
    hadoop fs -rm -r /output/wc
    2) 在代码中完成自动删除功能: 推荐大家使用这种方式
    在main中的//创建Configuration下面加入
    1
    2
    3
    4
    5
    6
    7
    // 准备清理已存在的输出目录
    Path outputPath = new Path(args[1]);
    FileSystem fileSystem = FileSystem.get(configuration);
    if(fileSystem.exists(outputPath)){
    fileSystem.delete(outputPath, true);
    System.out.println("output file exists, but is has deleted");
    }

Combiner

  • 本地的reducer
  • 减少Map Tasks输出的数据量及数据网络传输量

  • Combiner案例开发
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
package com.kun.hadoop.mapreduce;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import java.io.IOException;

/**
* 使用MapReduce开发WordCount应用程序
*/
public class CombinerApp {

/**
* Map:读取输入的文件
*/
public static class MyMapper extends Mapper<LongWritable, Text, Text, LongWritable>{

LongWritable one = new LongWritable(1);

@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {

// 接收到的每一行数据
String line = value.toString();

//按照指定分隔符进行拆分
String[] words = line.split(" ");

for(String word : words) {
// 通过上下文把map的处理结果输出
context.write(new Text(word), one);
}

}
}

/**
* Reduce:归并操作
*/
public static class MyReducer extends Reducer<Text, LongWritable, Text, LongWritable> {

@Override
protected void reduce(Text key, Iterable<LongWritable> values, Context context) throws IOException, InterruptedException {

long sum = 0;
for(LongWritable value : values) {
// 求key出现的次数总和
sum += value.get();
}

// 最终统计结果的输出
context.write(key, new LongWritable(sum));
}
}

/**
* 定义Driver:封装了MapReduce作业的所有信息
*/
public static void main(String[] args) throws Exception{

//创建Configuration
Configuration configuration = new Configuration();

// 准备清理已存在的输出目录
Path outputPath = new Path(args[1]);
FileSystem fileSystem = FileSystem.get(configuration);
if(fileSystem.exists(outputPath)){
fileSystem.delete(outputPath, true);
System.out.println("output file exists, but is has deleted");
}

//创建Job
Job job = Job.getInstance(configuration, "wordcount");

//设置job的处理类
job.setJarByClass(CombinerApp.class);

//设置作业处理的输入路径
FileInputFormat.setInputPaths(job, new Path(args[0]));

//设置map相关参数
job.setMapperClass(MyMapper.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(LongWritable.class);

//设置reduce相关参数
job.setReducerClass(MyReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(LongWritable.class);

//通过job设置combiner处理类,其实逻辑上和我们的reduce是一模一样的
job.setCombinerClass(MyReducer.class);

//设置作业处理的输出路径
FileOutputFormat.setOutputPath(job, new Path(args[1]));

System.exit(job.waitForCompletion(true) ? 0 : 1);
}
}

在运行过程中可以看出有没有combine

hadoop jar /home/hadoop/lib/hadoop-train-1.0.jar com.kun.hadoop.mapreduce.CombinerApp hdfs://hadoop:9000/hello.txt hdfs://hadoop:9000/output/wc

使用场景:
求和、次数 +
平均数 X

Partitoner


需求:相同类型的手机丢到一个reduce里

xiaomi 200
huawei 300
xiaomi 100
huawei 200
iphone7 300
iphone7 500
nokia 20

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
package com.kun.hadoop.mapreduce;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Partitioner;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import java.io.IOException;

public class ParititonerApp {

/**
* Map:读取输入的文件
*/
public static class MyMapper extends Mapper<LongWritable, Text, Text, LongWritable>{

@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {

// 接收到的每一行数据
String line = value.toString();

//按照指定分隔符进行拆分
String[] words = line.split(" ");

context.write(new Text(words[0]), new LongWritable(Long.parseLong(words[1])));

}
}

/**
* Reduce:归并操作
*/
public static class MyReducer extends Reducer<Text, LongWritable, Text, LongWritable> {

@Override
protected void reduce(Text key, Iterable<LongWritable> values, Context context) throws IOException, InterruptedException {

long sum = 0;
for(LongWritable value : values) {
// 求key出现的次数总和
sum += value.get();
}

// 最终统计结果的输出
context.write(key, new LongWritable(sum));
}
}

public static class MyPartitioner extends Partitioner<Text, LongWritable> {

@Override
public int getPartition(Text key, LongWritable value, int numPartitions) {

if(key.toString().equals("xiaomi")) {
return 0;
}

if(key.toString().equals("huawei")) {
return 1;
}

if(key.toString().equals("iphone7")) {
return 2;
}

return 3;
}
}


/**
* 定义Driver:封装了MapReduce作业的所有信息
*/
public static void main(String[] args) throws Exception{

//创建Configuration
Configuration configuration = new Configuration();

// 准备清理已存在的输出目录
Path outputPath = new Path(args[1]);
FileSystem fileSystem = FileSystem.get(configuration);
if(fileSystem.exists(outputPath)){
fileSystem.delete(outputPath, true);
System.out.println("output file exists, but is has deleted");
}

//创建Job
Job job = Job.getInstance(configuration, "wordcount");

//设置job的处理类
job.setJarByClass(ParititonerApp.class);

//设置作业处理的输入路径
FileInputFormat.setInputPaths(job, new Path(args[0]));

//设置map相关参数
job.setMapperClass(MyMapper.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(LongWritable.class);

//设置reduce相关参数
job.setReducerClass(MyReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(LongWritable.class);

//设置job的partition
job.setPartitionerClass(MyPartitioner.class);
//设置4个reducer,每个分区一个
job.setNumReduceTasks(4);

//设置作业处理的输出路径
FileOutputFormat.setOutputPath(job, new Path(args[1]));

System.exit(job.waitForCompletion(true) ? 0 : 1);
}
}

Partitioner
hadoop jar /home/hadoop/lib/hadoop-train-1.0.jar com.kun.hadoop.mapreduce.ParititonerApp hdfs://hadoop:9000/partitioner hdfs://hadoop:9000/output/partitioner
如下图;四个分区结果

使用场景:
数据倾斜情况下可以分区来处理

jobhistory开启

  • 记录已运行完的MapReduce信息到指定的HDFS目录下
  • 默认是不开启的

修改hadoop_home/etc/hadoop/mapred-site.xml

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
<property>
<name>mapreduce.jobhistory.address</name>
<value>hadoop:10020</value>
</property>

<property>
<name>mapreduce.jobhistory.webapp.address</name>
<value>hadoop:19888</value>
</property>

<property>
<name>mapreduce.jobhistory.done-dir</name>
<value>/history/done</value>
</property>

<property>
<name>mapreduce.jobhistory.intermediate-done-dir</name>
<value>/history/done_intermediate</value>
</property>

重新启动yarn集群
启动jobhistory
mr-jobhistory-daemon.sh start historyserver

此时就可进入JobHistoryServer界面

下图流程解释进入JobHistoryServer界面操作;点击finishd,点击运行完毕的作业
【Yarn界面】

点击History
【Yarn界面】

进入History(JobHistoryServer界面)

【JobHistoryServer界面】

错误解决:
History需要配置,配置后可点击进入JobHistoryServer界面;但是点击JobHistoryServer界面【logs】就会报错见【错误图一】;【错误图一】解决后再次点击yarn界面的【logs】会遇到【错误图二】

【错误图一】

解决上诉JobHistoryServer界面问题后,yarn界面的logs点击后可能会遇到下图错误-错误解决 配置yarn.log.server.url的路径
【错误图二】

修改配置文件修改hadoop_home/etc/hadoop/yarn-site.xml
增加

1
2
3
4
5
6
7
8
9
<property>  
<name>yarn.log-aggregation-enable</name>
<value>true</value>
</property>

<property>
<name>yarn.log.server.url</name>
<value>http://hadoop:19888/jobhistory/logs</value>
</property>

重新启动JobHistoryServer和yarn集群
stop-yarn.sh
mr-jobhistory-daemon.sh stop historyserver
start-yarn.sh
mr-jobhistory-daemon.sh start historyserver
进入hadoop_home/share/hadoop/mapreduce 运行一个官方demo
再次查看结果logs就没有错误了

1
[root@hadoop mapreduce]# hadoop jar hadoop-mapreduce-examples-2.6.0-cdh5.14.4.jar pi 2 3